详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)

您所在的位置:网站首页 kafka 生产者配置 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)

详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)

2024-07-11 08:16| 来源: 网络整理| 查看: 265

1 缘起

学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka, 所以,本次是纯Java的方式操作Kafka, 构建生产者和消费者,本地部署Kafka环境,给出测试样例的测试结果, 同时,讲解部分通用的参数, 及给出通过命令行启动生产者和消费者的测试样例, 分享如下,帮助读者学习Kafka基础操作。

2 环境准备

下载kafka:https://download.csdn.net/download/Xin_101/19787459

2.1 启动zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties 2.2 启动kafka bin/kafka-server-start.sh config/server.properties 2.3 新建topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic

在这里插入图片描述

2.4 依赖 org.apache.kafka kafka-clients 3.2.0 3 Kafka通用配置

Kafka的生产者和消费者参数比较多,这里仅列出一些测试用的参数, 区分生产者和消费者。

3.1 生产者配置参数 序号参数描述1bootstrap.serversKafka主机2acks生产者:要求leader请求结束前收到的确认次数,来控制发送数据的持久化消息确认: 0:生产者不等待服务器确认,此时retry参数不生效1:leader写入记录到log,不会等待follower的确认即向生产者发送通知all:leader等待所有副本通知,然后向生产者发送通知,保证所有数据落盘到所有副本,功能同设置为-13retries生产者重试次数4batch.size生产者:向同一分区发送打包发送的数据量,单位:bytes,默认16384bytes=16K5linger.ms生产者:批量发送消息的间隔时间(延迟时间),单位:毫秒6buffer.memory生产者:可以使用的最大缓存空间,单位:bytes,默认33554432bytes=32M7key.serializer生产者:键编码器8value.serializer生产者:值编码器 3.2 消费者配置参数 序号参数描述1bootstrap.serversKafka主机2group.id消费者:消费topic的组ID3enable.auto.commit消费者:后台定期提交offset4auto.commit.interval.ms消费者提交offset的时间间隔:单位:毫秒,当enable.auto.commit为true时生效5auto.offset.reset消费者:重启后配置offsetearliest:消费者恢复到当前topic最早的offset latest:消费者从最新的offset开始消费none:如果消费者组没找到之前的offset抛出异常其他任何值都会抛出异常6key.deserializer消费者:键解码器7value.deserializer消费者:值解码器 3.3 Kafka通用参数封装

由于参数众多,这里封装了一个Kafka通用参数类,给了默认值, 本地测试,直接使用默认参数, 同时给出了有参构造器,自定义参数, 代码样例如下。

package com.monkey.java_study.mq.kafka; import java.util.Collection; import java.util.Collections; /** * Kafka通用配置. * * @author xindaqi * @since 2022-08-03 9:49 */ public class KafkaCommonProperties { /** * Kafka主机 */ private String kafkaHost = "192.168.211.129:9092"; /** * 生产者:要求leader请求结束前收到的确认次数,来控制发送数据的持久化 * 消息确认: * 0:生产者不等待服务器确认,此时retry参数不生效 * 1:leader写入记录到log,不会等待follower的确认即向生产者发送通知 * all:leader等待所有副本通知,然后向生产者发送通知,保证所有数据落盘到所有副本,功能同设置为-1 */ private String ack = "all"; /** * 生产者重试次数 */ private Integer retryTimes = 1; /** * 生产者:向同一分区发送打包发送的数据量,单位:bytes,默认16384bytes=16K */ private Integer batchSize = 16384; /** * 生产者:批量发送消息的间隔时间(延迟时间),单位:毫秒 */ private Integer lingerMs = 1; /** * 生产者:可以使用的最大缓存空间,单位:bytes,默认33554432bytes=32M. */ private Integer bufferMemory = 33554432; /** * 生产者:键编码器 */ private String keyEncoder = "org.apache.kafka.common.serialization.StringSerializer"; /** * 生产者:值编码器 */ private String valueEncoder = "org.apache.kafka.common.serialization.StringSerializer"; /** * 消费者:消费topic的组ID */ private String groupId = "my-group-id"; /** * 消费者:后台定期提交offset */ private String autoCommit = "true"; /** * 消费者提交offset的时间间隔:单位:毫秒,当enable.auto.commit为true时生效 */ private String autoCommitIntervalMs = "1000"; /** * 消费者:键解码器 */ private String keyDecoder = "org.apache.kafka.common.serialization.StringDeserializer"; /** * 消费者:值解码器 */ private String valueDecoder = "org.apache.kafka.common.serialization.StringDeserializer"; /** * 消费者:重启后配置offset * earliest:消费者恢复到当前topic最早的offset * latest:消费者从最新的offset开始消费 * none:如果消费者组没找到之前的offset抛出异常 * 其他任何值都会抛出异常 */ private String autoOffsetReset = "latest"; /** * TOPIC */ private Collection topic = Collections.singleton("my-topic"); public KafkaCommonProperties() { } public KafkaCommonProperties(String kafkaHost, String ack, Integer retryTimes, Integer batchSize, Integer lingerMs, Integer bufferMemory, String keyEncoder, String valueEncoder, String groupId, String autoCommit, String autoCommitIntervalMs, String keyDecoder, String valueDecoder, String autoOffsetReset, Collection topic) { this.kafkaHost = kafkaHost; this.ack = ack; this.retryTimes = retryTimes; this.batchSize = batchSize; this.lingerMs = lingerMs; this.bufferMemory = bufferMemory; this.keyEncoder = keyEncoder; this.valueEncoder = valueEncoder; this.groupId = groupId; this.autoCommit = autoCommit; this.autoCommitIntervalMs = autoCommitIntervalMs; this.keyDecoder = keyDecoder; this.valueDecoder = valueDecoder; this.autoOffsetReset = autoOffsetReset; this.topic = topic; } // 省略setter和getter及toString() } 4 Code实践 4.1 生产者

构建Kafka数据生产者, 测试样例的配置有:Kafka broker地址,消息确认,重试,批量发送数据,数据键和值的编码器, 重写Callback实现异步生产数据。

4.1.1 生产数据 package com.monkey.java_study.mq.kafka; import org.apache.kafka.clients.producer.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; import java.util.Properties; /** * Kafka生产者. * * @author xindaqi * @since 2022-08-02 9:59 */ public class KafkaProducerTest { private static final Logger logger = LoggerFactory.getLogger(KafkaProducerTest.class); public static KafkaProducer getDefaultKafkaProducer(KafkaCommonProperties kafkaCommonProperties) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCommonProperties.getKafkaHost()); properties.put(ProducerConfig.ACKS_CONFIG, kafkaCommonProperties.getAck()); properties.put(ProducerConfig.RETRIES_CONFIG, kafkaCommonProperties.getRetryTimes()); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaCommonProperties.getBatchSize()); properties.put(ProducerConfig.LINGER_MS_CONFIG, kafkaCommonProperties.getLingerMs()); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaCommonProperties.getBufferMemory()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaCommonProperties.getKeyEncoder()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaCommonProperties.getValueEncoder()); return new KafkaProducer(properties); } static class MyProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (Objects.nonNull(exception)) { logger.error(">>>>>>>>>>Producer生产消息异常:", exception); } if (Objects.nonNull(metadata)) { logger.info(">>>>>>>>>>Producer生产消息:metadata:{},partition:{}, offset:{}", metadata, metadata.partition(), metadata.offset()); } } } public static void main(String[] args) { KafkaCommonProperties kafkaCommonProperties = new KafkaCommonProperties(); KafkaProducer producer = getDefaultKafkaProducer(kafkaCommonProperties); String message = "hello world "; try { for (int i = 0; i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3